7장 맵리듀스 타입과 포맷

  1. 7장 맵리듀스 타입과 포맷
    1. 1 맵리뷰스 타입
      1. 기본적인 맵리뷰스 잡
      2. 기본 스트리밍 잡
      3. 스트리밍 키와 값
    2. 2 입력 포맷
      1. 입력 스플릿과 레코드
        1. 입력 스플릿
        2. InputFormat
        3. MapRunner
        4. FileInputFormat
        5. FileInputFormat 입력 경로
        6. FileInputFormat 입력 스플릿
        7. 작은 파일과 CombineFileInputFormat
        8. 스플릿 방지
        9. 전체 파일을 하나의 레코드로 처리하기
      2. 텍스트 입력
        1. TextInputFormat
        2. KeyValueTextInputFormat
        3. NLineInputFormat
        4. XML
      3. 바이너리 입력
        1. SequenceFileInputFormat
        2. SequenceFileAsTextInputFormat
        3. SequenceFileAsBinaryInputFormat
      4. 다중 입력
      5. 데이터베이스 입력과 출력
    3. 3 출력 포맷
      1. Text Output
      2. 바이너리 출력
        1. SequenceFileOutputFormat
        2. SequenceFileAsBinaryOutputFormat
        3. MapFileOutputFormat
      3. Multiple Outputs
        1. MultipleOutputFormat
        2. MultipleOutputs
      4. 느린 출력
      5. Database Output

1 맵리뷰스 타입

Map & Reduce 함수 형식은 다음과 같다.
입력 -> 중간 -> 출력


map : (K1, V1) -> list(K2, V2)
combine : (K2, list(V2)) -> list(K2, V2)
reduce : (K2, list(V2)) -> list(K3, V3)

  • 맵 입력키와 값 타입은 맵 출력 타입과 다르다.
  • 리듀의 입력 타입은 맵의 출력 타입과 같아야 한다.
  • combine함수
    • 출력타입은 맵과 리듀스의 중간 키와 값에 해당하는 타입(K2, V2) 이 되어 리뷰수 함수의 입력값이 됨.
      *대부분 combine과 reduce함수는 타입이 같은 경우가 많다.
  • partition 함수는 중간 키와 값(K2, V2)에 대해서 동작하며, 파티션번호 반환한다.
    • 실제로 파티션은 키에 의해서 결정됨.

partition : (K2, V2) -> 정수  * 적어도 나는 쓸 일이 없었다.

  • 입력타입은 InputFormat클래스를 통해서 정해짐( setInputFormat() )
    e.g.
    • TextInputFormat은 기본적으로 중간타입을 LongWritable 키와 Text값로 생성한다.
      (다른 타입은 JobConf 의 메소드를 호출하여 명시적으로 설정한다)
    • K2, K3가 같다면 setMapOutputKeyClass() 호출 필요없음.
    • setOutputKeyClass()에 정의된 타입으로 재정의 되기 때문임.
    • V2, V3가 같으면 setOutputKeyClass()만 사용하면 된다.
      @ Writable클래스는 p.138참조
      @ Java primitive type과 매핑
  • 그럼에도 중간과 최종 출력 타입을 설정하는 메소드가 모두 존재하는 이유는?
    • 자바 제네릭때문 - 타입 삭제(Type Erasure) 기능 때문에 runtime에 타입 정보가 항상 존재하지 않는다.
      따라서, 명시적으로 타입정보를 하둡에 제공해야 한다.
    • 호환되지 않는 타입으로 맵리듀스 잡을 설정할 수 있음을 의미
    • 설정된 타입은 Compile시 체크되지 않고 Job 실행 중 런타임에 발견됨
      v 이 때문에 먼저 작은 데이터로 테스트 잡을 실행 해 보면 좋음.
      (( Type Erasure : JVM레벨 호환성 유지 위해 Compile시 제네릭 타입 정보를 삭제해버리는 기능 ))

기본적인 맵리뷰스 잡

[STUDY:예제 7-1]

MinimalMapReduceWithDedaults.java


public class MinimalMapReduceWithDedaults extends Configured implements Tool {

     public int run(String[] args) throws Exception {
          JobConf conf = new JobConf(getConf(), this.getClass());
         
          if (conf == null) {
               return -1;
          }
         
          FileInputFormat.addInputPath(conf, new Path(args[0]));
          FileOutputFormat.setOutputPath(conf, new Path(args[1]));
         
          conf.setInputFormat(TextInputFormat.class);
         
          /* 맵 타스크 수를 1로 설정하지 않으며 단지 힌트임.
           * 실제로는 입력 데이트 크기, 파일 블록 크기에 의해 결정됨.
           */
          conf.setNumMapTasks(1);
          /*
           * 맵 입력키와 출력키가 같은 타입이고, 맵 입력값과 출력값이 같은 타입이면
           * 키와 값의 타입에 상관없이 잘 동작한다.
           */
          conf.setMapperClass(IdentityMapper.class);          
          conf.setMapRunnerClass(MapRunner.class);          // 각 레코드를 순차적으로 읽어서  매퍼의 map()호출
         
          conf.setMapOutputKeyClass(LongWritable.class);     // 맵 출력키
          conf.setMapOutputValueClass(Text.class);          // 맵 출력값
         
          /*
           * 레코드 키를 해시해서 레코드가 어떤 파티션에 속하는 지 결정.
           * 각 파티션은 reduce task에 의해 처리.
           * Job 내 파티션 개수는 reduce task 개수와 같다. 
           */
          conf.setPartitionerClass(HashPartitioner.class);
         
          conf.setNumReduceTasks(1);
          conf.setReducerClass(IdentityReducer.class);     // 변경없이 모든 입력데이터를 출력으로 보냄
          conf.setOutputKeyClass(LongWritable.class);          // 리듀스 출력키
          conf.setOutputValueClass(Text.class);               // 리듀스 출력값
          /*
           * 기본 출력 포맷
           * 키/값을 문자열로 변환하고 구분자는 탭을 쓰면서 레코드를 라인단위로 기록.
           */
          conf.setOutputFormat(TextOutputFormat.class);
         
          JobClient.runJob(conf);
          return 0;
     }

}


키의 해시함수가 잘 동작한다면,
1) 모든 레코드가 전체 리듀스 타스크에 고르게 분산되며
2) 같은 키를 갖는 모든 레코드는 같은 리듀스 타스크가 처리한다.

기본 스트리밍 잡

이건 언제 쓰나?
입력 데이터를 정렬하는 효과를 낸다.

스트리밍 키와 값

스트리밍 응용 프로그램은 구분자를 제어할 수 있다.
[STUDY:표 7-2] 스트리밍 분리자 속성 참조
기본은 탭 문자, 키와 값에 탭 문자가 있을 때 변경하는 것이 바람직하다.
첫 번째 필드에 다른 필드를 더하여 키를 만들어 출력할 수 있다.

2 입력 포맷

입력 스플릿과 레코드

  • 입력 스플릿은 하나의 맵에서 처리하는 입력 파일의 청크이다.
  • 각 맵 프로세스는 단일 스플릿이다.
  • 각 스플릿은 레코드들로 나누어진다.
  • 맵은 각 레코드(Key / Value pair)를 차례로 처리한다.
  • 스플릿 == 테이블의 일정 범위에 있는 행의 집합
  • 레코드 == 하나의 행에 해당된다.
    e.g. RDB에서 데이터는 읽는 입력 포맷인 DBInputFormat은 정확하게 이러한 개념을 사용한다.

입력 스플릿

Java Interface인 InputSplit에 의해 표현되며, 두 개의 멤버변수를 가진다.
! 데이터에 대한 참조 객체이지 입력 데이터를 포함하지 않는다.


public interface InputSplit extends Writable {
    long getLength() throws IOException;          // 바이트로 된 길이

    String[] getLocations()  throws IOException; // 호스트네임 문자열로 된 저장소 위치
}

  • 저장소 위치는 맵리듀스 시스템에 의해 사용된다.
  • 시스템은 가능한 한 가장 가까운 곳에 타스크를 배치시킨다.
  • 스플릿을 큰 것부터 순서를 매겨 처리하기 위해 바이트 길이를 사용한다.
  • 이렇게 하면 Job 수행 시간을 최소화할 수 있다.
  • InputSplit을 직접 다룰 필요없다. InputFormat에 의해 생성되기 때문이다.

InputFormat

  • 입력 스플릿을 생성한다.
  • 스플릿을 레코드로 나누는 일을 한다.

public interface InputFormat<K, V> {
    // numSplits 로 원하는 맵타스크 개수를 설정한다. 임의의 수를 반환하기 때문에 힌트로서만 사용된다.
    InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

    RecordReader<K, V> getRecordReader(InputSplit split,
                                                                              JobConf job,
                                                                              Reporter reporter) throws IOException;
}

  • 스플릿 크기나 개수가 InputFormat 구현체에 의해 계산되면 클라이언트는 스플릿 정보를 JobTacker에 보낸다.
  • JobTracker는 스플릿 위치정보로 TaskTracker 상에서 스플릿을 처리할 맵타스크를 스케쥴링한다.
  • TaskTracker에서 맵타스크는 Split의 RecordReader를 얻기 위해 getRrecordReader()로 Split을 전달한다.
  • 맵타스크는 맵 함수에 전달할 Key/Value 쌍의 레코드를 생성하기 위해 RecordReader를 사용한다.

MapRunner

매퍼를 실행시키는 유일한 방법.
MultithreadedMapRunner 는 매퍼를 설정된 개수의 thread 개수만큼 동시에 실행시킨다.

FileInputFormat

원본 데이터로(p.279 [STUDY:그림 7-2]) 파일을 사용하는 모든 InputFormat구현의 기본 클래스이다.
1) Job의 입력 파일 경로
2) 입력 파일의 스플릿을 생성하는 구현체
를 제공한다.

FileInputFormat 입력 경로

JobConf에 입력 경로를 지정할 수 있도록 4개의 정적 메소드를 제공한다.


public static void addInputPath(JobConf conf, Path path) 
public static void addInputPaths(JobConf conf, String commaSeparatedPaths) 
public static void setInputPath(JobConf conf, Path... inputPaths) 
public static void setInputPath(JobConf conf, String commaSeparatedPaths) 

  • add는 경로를 추가하고 set은 대체한다.

FileInputFormat 입력 스플릿

위에서 본 2번 FileInputFormat이 어떻게 스플릿을 만들어 낼까?

  • 오직 큰 파일만 분리(HDFS 블록보다 큰 파일을 의미)
  • 스플릿 크기는 대부분 HDFS 블록의 크기와 같다.
    [STUDY:표 7-4] 스플릿 크기를 제어하는 속성
  • 최소 스플릿 크기는 보통 1byte이며, 일부 포맷은 스플릿 크기에 대한 최소 크기 제한이 있다(e.g. SequeceFile)
  • 응용 프로그램이 최소 스플릿 크기를 부과할 수 있다.
    • 블록 크기보다 더 큰 값을 설정하면 블록보다 더 크게 스플릿을 만들 수 있다.
      ! 부작용이 더 크네요. 맵타스크에 할당되는 블록 중 로컬에 조재하는 경우가 많아질 뿐이다.
  • 최대 크기는 블록크기보다 작을 효과가 있다.
  • 스클릿 크기 계산식

max( 최소크기, min(최대크기, 블록크기 ) )
최소크기 < 블록크기 < 최대크기

그래서 스플릿 크기는 블록크기이다.

작은 파일과 CombineFileInputFormat

  • Hadoop 성능은 적은 수의 큰 파일처리가 좋다.
    • 작은 입력 데이터가 많으면 많을수록 추가적인 bookkeeping overhead발생한다.
  • 작은 파일과 잘 동작하도록 고안된 CombineFileInputFormat을 사용하면 어느 정도 해결된다.
    • CombineFileInputFormat
      1. FileInputFormat이 파일 당 스플릿을 생성했다면
        CombineFileInputFormat은 파일을 같은 스플릿으로 묶는다.
      2. 어떤 블록을 같은 스플릿에 배치할 지는 노드와 랙의 위치를 고려한다.
      3. 일반적인 맵리뷰스 잡에서 입력 데이터를 처리할 수 있는 속도와 비슷하게 만든다.
      4. 추상클래스이므로 getRecordReader() 를 구현해야 한다.

스플릿 방지

필요한 시점
e.g. 파일의 모든 레코드가 정렬되었는지 확인해야 할 때 하나의 맵이 파일 전체를 처리할 때 동작할 것이다.

  1. 최소 스플릿크기를 시스템의 가장 큰 파일보다 더 크게 설정하는 것이다(Long.MAX_VALUE로 설정)
  2. FileInputFormat 하위 구현클래스의 하위 클래스를 생성하고 isSplitable()을 return false하도록 override.

전체 파일을 하나의 레코드로 처리하기

WholeFileInputFormat Code 참조
파일 스플릿을 취해서 단일 레코드로 변환하는 일을 하며,
키는 null, 값은 전체 파일 내용이다.

텍스트 입력

TextInputFormat

  • 키는 LongWritable로서 파일 내에서 라인 시작 지점의 바이트 오프셋 주소
  • 값은 라인의 내용이며 라인피드나 캐리지리턴은 제외되고 Text객체로서 패키지 된다.

KeyValueTextInputFormat

기본 OutputFormat인 TextOutputFormat에 의해 만들어지는 출력물이 있는데
정확하게 해석하기 위해서는 KeyValueTextInputFormat이 적절하다.


(line1, On the top of the Crumpetty Tree)
(line2, The Quangle Wangle set,)
(line3, But his face you could not see,)
(line4, On account of his Beaver Hat.)

NLineInputFormat

  • 라인개수는 스플릿 크기와 라인길에 좌우된다.
  • 매퍼가 고정된 개수의 입력 라인을 받을 싶을 때 사용한다.

XML

연속돈 레코드(XML문서 조작들)로 구성된 큰 XML문서는 문자열이나 정규표현식을 이용하여
레코드의 시작과 종료 태그를 찾아서 부분 레코드로 나눌 수 있다
StreamXmlRecordReader 를 사용하면 페이지 요소는 매퍼에 의해 처리될 레코드로서 인식될 수 있다.

바이너리 입력

SequenceFileInputFormat

  • 키와 값은 sequence파일에 의해서 정해지며 맵 입력 유형이 일치하는지 확인해야 한다.

SequenceFileAsTextInputFormat

sequence파일의 키와 값을 Text객체로 변환시킨다.

SequenceFileAsBinaryInputFormat

sequence파일의 키와 값을 임의의 바이너리 객체로 가져온다.

다중 입력

하나는 탭으로 분리된 평범한 텍스트이고 다른 하나는 바이너리 순차 파일이다.
같은 포맷이지만 다른 외형을 가지고 있어서 다른 방식으로 해석해야 한다.
이런 경우, MultipleInput클래스를 사용한다.

  • 경로별로 사용할 InputFormat과 매퍼를 지정할 수 있다.

MultipleInputs.addInputPath(conf, ncdcInputPath, 
     TextInputFormat.class, MaxTemperaturerMapper.class);
MultipleInputs.addInputPath(conf, metOfficeInputPath, 
     TextInputFormat.class, MetOfficeMaxTemperatuerMapper.class);

데이터베이스 입력과 출력

DBInputFormat 은 관계형 데이터베이스에서 JDBC를 이용하여 데이터를 읽는 입력 포맷이다.
관계형 데이터베이스와 HDFS간 데이터를 옮기는 또 다른 방법으로 스쿱(15장에서)을 고려해 볼 수 있다.

3 출력 포맷

Text Output

  • TextOutputFormat은 기본 출력 포맷으로 레코드를 텍스트의 라인으로 기록한다.
  • 키와 값은 어떤 타입이라도 될 수 있다.
  • 키/값 쌍은 탭 문자로 분리
  • 이 구분자는 mapred.textoutputformat.separator 속성을 이용해서 변경 가능.
  • NullWritable 타입을 이용해서 출력물로부터 키와 값을 억제할 수 있다(아무것도 출력하지 않게 말이다)

바이너리 출력

SequenceFileOutputFormat

  • 시퀀스 파일로 출력물을 쓴다.
  • 쉽게 압축할 수 있기 때문에 이후 추가로 맵리듀스 잡을 수행할 때 output을 input으로 사용한다면 좋은 선택이 될 수 있다.

SequenceFileAsBinaryOutputFormat

  • SequenceFileAsBinaryInputFormat 대응
  • 원시 바이너리 포맷으로 키와 값을 SequenceFile 컨테이너에 쓴다.

MapFileOutputFormat

  • MapFiles을 출력으로 쓴다.

Multiple Outputs

  • FileOutputFormat과 하위 클래스는 출력 디렉토리에 파일의 집합을 다음과 같이 생성한다.
    • 리뷰서당 한 개의 파일
    • 파일들은 파티션번호(part-00000, part-00001,...)처럼 이름 지어진다.
  • 경우에 따라 파일이름을 제어할 필요가 있거나 리듀서 당 다수 파일을 생성해야 하는 경우가 있다.
  • MultipleOutputFormat과 MultipleOutputs 사용

MultipleOutputFormat

데이터를 여러 개의 파일에 쓸 수 있도록 해 주며 파일명은 출력키와 값으로부터 파생될 수 있다.

MultipleOutputs

  • 각 출력에 대해 다른 타입들을 만들어 낼 수 있다.
  • 반면, 출력물 이름에 대한 제어력은 더 낮다.
  • MultipleOutputFormat과 MultipleOutputs의 차이
기능MultipleOutputFormatMultipleOutputs
파일과 디렉토리 이름에 대한 완력한 제어YesNo
다른 산출물에서는 다른 타입의 키와 값 사용 가능NoYes
같은 잡에서 맵과 리듀스에 의해 사용NoYes
레코드별 다중 출력물 생성NoYes
어떤 Outputformat도 사용가능No(SubClass화)Yes

느린 출력

  • FileOutputForm의 하위클래스는 내용이 없더라고 출력파일(part-nnnnn)을 생성한다.
  • 빈 파일이 생성되길 원치 않을 때 LazyOutputFormat이 도움을 줄 수 있음(0.21.0이상)
  • Streaming과 Pipes는 LazyOutputFormat을 사용할 수 있도록 lazyOutput 옵션 제공함.

Database Output

TableOutputFormat은 맵리듀스 결과물을 HBase 테이블로 쓰기 위한 출력 포맷이다.
p.296참조